Skip to content

LangGraph AI Agent 核心模块

本文介绍使用 LangGraph 开发 AI Agent 应用时必知必会的核心模块和概念。

架构全景

┌─────────────────────────────────────────────────────────────────────────┐
│                    LangGraph AI Agent 核心架构                     │
├─────────────────────────────────────────────────────────────────────────┤
│  基础构建层        │  执行控制层          │  高级特性层                        │
├─────────────────────────────────────────────────────────────────────────┤
│ • State (状态)      │ • Edges (边)           │ • Memory (短期/长期)              │
│ • Nodes (节点)      │ • Conditional Edges  │ • Interrupts (中断/人机协作)  │
│ • Graph (图)         │ • Command (命令)      │ • Subgraphs (子图)              │
│ • Tools (工具)      │ • Recursion (递归)      │ • Parallel (并行执行)            │
│                    │                    │ • Streaming (流式输出)            │
└─────────────────────────────────────────────────────────────────────────┘

一、基础构建层

1.1 State(状态)

定义图执行过程中的数据结构,是节点间传递的核心载体。

python
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]  # 使用 reducer 合并消息
    context: str

核心概念:

  • TypedDict: 类型化的状态定义
  • Annotated: 带注解的字段,可附加 reducer 函数
  • Reducer: 用于合并状态更新的函数(如 operator.add

1.2 Nodes(节点)

执行具体业务逻辑的函数,每个节点接收状态并返回状态更新。

python
def call_model(state: AgentState) -> AgentState:
    """调用 LLM 生成响应"""
    response = model.invoke(state["messages"])
    return {"messages": [response]}

def tool_node(state: AgentState) -> AgentState:
    """执行工具调用"""
    tool_calls = state["messages"][-1].tool_calls
    results = [execute_tool(call) for call in tool_calls]
    return {"messages": results}

节点类型:

  • 普通节点: 执行业务逻辑
  • 工具节点: 执行工具调用
  • 条件节点: 根据状态决定下一步

1.3 Graph(图)

组织节点和边的工作流容器。

python
from langgraph.graph import StateGraph, START, END

builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", "tools")
builder.add_edge("tools", END)
graph = builder.compile()

图类型:

  • StateGraph: 最常用的图,支持状态管理
  • MessageGraph: 简化的消息处理图
  • 编译后的图: 调用 compile() 生成可执行图

1.4 Tools(工具)

扩展 LLM 能力的外部函数,可被模型调用。

python
from langchain.tools import tool

@tool
def search_web(query: str) -> str:
    """搜索网络获取信息"""
    # 实现搜索逻辑
    return search_results

@tool
def get_weather(city: str) -> dict:
    """获取指定城市的天气"""
    # 实现天气查询逻辑
    return {"temperature": 25, "condition": "sunny"}

工具特性:

  • 使用 @tool 装饰器定义
  • 自动生成 JSON Schema
  • 支持参数验证
  • 可绑定到 LLM

二、执行控制层

2.1 Edges(边)

定义节点之间的连接关系,控制执行流程。

python
# 直接边
builder.add_edge("node_a", "node_b")

# 起始边
builder.add_edge(START, "first_node")
# 结束边
builder.add_edge("last_node", END)

边类型:

  • 普通边: 固定连接 node_a -> node_b
  • 起始边: START -> node
  • 结束边: node -> END

2.2 Conditional Edges(条件边)

根据状态动态决定下一步执行哪个节点。

python
def should_continue(state: AgentState) -> Literal["tools", END]:
    """判断是否需要调用工具"""
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

builder.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", END: END}
)

条件边特性:

  • 误路由函数返回下一个节点名称
  • 支持多个目标节点
  • 可返回 END 结束执行

2.3 Command(命令)

精确控制图的执行流程,包括跳转和状态更新。

python
from langgraph.types import Command

def routing_node(state: State) -> Command[Literal["node_a", "node_b"]]:
    if state["should_retry"]:
        return Command(
            goto="node_a",
            update={"retry_count": state["retry_count"] + 1}
        )
    return Command(goto="node_b")

Command 能力:

  • goto: 指定下一个执行的节点
  • update: 更新状态
  • resume: 恢复中断的执行(用于人机协作)

2.4 Recursion(递归)

实现循环或迭代逻辑,让图可以自我调用。

python
def recursive_node(state: State) -> Command[Literal["recursive_node", END]]:
    if state["count"] < state["max_count"]:
        return Command(
            goto="recursive_node",
            update={"count": state["count"] + 1}
        )
    return Command(goto=END)

递归特性:

  • 支持自我循环
  • 可设置最大迭代次数
  • 结合 Command 使用
  • 适合需要重试或迭代的场景

三、高级特性层

3.1 Memory(记忆)

详见 LangGraph Memory 架构详解

核心组件:

  • Checkpointer: 皂停/恢复、时间旅行
  • Store: 跨会话长期记忆
  • Trimming: 消息裁剪
  • Summarization: 对话摘要

3.2 Interrupts(中断/人机协作)

暂停执行等待人工输入或审批。

python
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver

def approval_node(state: State):
    # 暂停并等待人工审批
    decision = interrupt({
        "question": "是否批准此操作? Please approve or reject.",
        "details": state["action_details"]
    })
    return {"approved": decision}

builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_edge(START, "approval")
builder.add_edge("approval", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

中断特性:

  • 使用 interrupt() 函数暂停执行

  • 中断信息会出现在 __interrupts__ 字段中

  • 使用 Command(resume=value) 恢复执行

  • 需要 Checkpointer 支持

  • 适合需要人工审批的场景(如发送邮件、执行危险操作等)

完整示例:

python
# 启动执行(会中断)
result = graph.invoke({"action_details": "发送邮件给 user@example.com"}, config)
print(result["__interrupts__"])  # 查看中断信息

# 恢复执行(批准)
result = graph.invoke(Command(resume=True), config)  # 批准
# 恢复执行(拒绝)
result = graph.invoke(Command(resume=False), config)  # 拒绝

3.3 Subgraphs(子图)

将复杂逻辑拆分为可复用的子模块,实现图的嵌套。

python
# 子图定义
class ChildState(TypedDict):
    child_value: str

def child_node(state: ChildState) -> ChildState:
    return {"child_value": f"processed: {state['child_value']}"}

child_builder = StateGraph(ChildState)
child_builder.add_node("child_node", child_node)
child_builder.add_edge(START, "child_node")
child_builder.add_edge("child_node", END)
child_graph = child_builder.compile()

# 父图中调用子图
class ParentState(TypedDict):
    parent_value: str
    child_result: str

def call_child(state: ParentState) -> ParentState:
    result = child_graph.invoke({"child_value": state["parent_value"]})
    return {"child_result": result["child_value"]}

子图特性:

  • 支持多层嵌套
  • 子图有独立的状态空间
  • 父子图通过 invoke 通信
  • 可传递和转换状态
  • 实现模块化和复用

3.4 Parallel(并行执行)

同时执行多个独立任务并聚合结果。

python
from operator import add
from typing import Annotated

class ParallelState(TypedDict):
    topic: str
    results: Annotated[list, add]  # 使用 reducer 合并结果

def task_a(state: ParallelState) -> ParallelState:
    result = model.invoke(f"写关于 {state['topic']} 的笑话")
    return {"results": [{"type": "joke", "content": result.content}]}

def task_b(state: ParallelState) -> ParallelState:
    result = model.invoke(f"写关于 {state['topic']} 的诗歌")
    return {"results": [{"type": "poem", "content": result.content}]}

builder = StateGraph(ParallelState)
builder.add_node("task_a", task_a)
builder.add_node("task_b", task_b)
builder.add_node("aggregator", aggregator_node)
# 并行边: START 同时指向两个节点
builder.add_edge(START, "task_a")
builder.add_edge(START, "task_b")
# 汇聚边: 两个节点都指向聚合节点
builder.add_edge("task_a", "aggregator")
builder.add_edge("task_b", "aggregator")
builder.add_edge("aggregator", END)

并行特性:

  • 同一步骤中的节点并行执行
  • 使用 Annotated + reducer 合并结果
  • 适合独立可并行的任务
  • 显著提升性能

注意事项:

  • 并行节点的状态更新顺序不确定
  • 黙需要使用 reducer 确保正确合并
  • 聚合节点需要等待所有并行节点完成

3.5 Streaming(流式输出)

实时输出执行过程,支持多种输出模式。

python
# 流式输出示例
for chunk in graph.stream(
    {"messages": [{"role": "user", "content": "你好"}]},
    config,
    stream_mode="values"  # 输出模式
):
    print(chunk)

流式输出模式:

模式说明用途
"values"每个步骤后的完整状态查看完整状态变化
"updates"状态增量更新细粒度状态追踪
"messages"LLM 输出的消息流实时显示 LLM 响应
"debug"调试信息开发调试
"custom"自定义输出特殊需求

Token 级流式输出:

python
# 按 Token 流式输出
for chunk in graph.stream(
    input,
    config,
    stream_mode="messages",
    version="v2",  # 使用 v2 版本
):
    if "messages" in chunk:
        for token in chunk["messages"]:
            print(token.content, end="", flush=True)

四、模块选择指南

4.1 猏的 Agent

使用 LangGraph 快速构建标准 Agent。

python
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(
    model=model,
    tools=[search_web, get_weather],
    checkpointer=checkpointer
)

适用场景:

  • 快速原型开发
  • 标准 ReAct 模式
  • 简单的 Agent 需求

4.2 自定义 Agent

完全控制 Agent 的每个环节。

适用场景:

  • 复杂业务逻辑
  • 需要精确控制流程
  • 特殊的状态管理需求

4.3 騡块组合建议

场景推荐模块组合
癮通对话 AgentState + Nodes + Graph + Memory
工具调用 Agent+ Tools + Conditional Edges + Memory
需人机协作+ Interrupts + Checkpointer
复杂工作流+ Subgraphs + Parallel
长时间运行+ Store + Streaming
生产部署Memory + Interrupts + Streaming

五、相关文档